// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
//     https://opensource.org/licenses/Apache-2.0

#include "actor-sqlite.h"
#include "io-gate.h"

#include <workerd/util/autogate.h>
#include <workerd/util/capnp-mock.h>
#include <workerd/util/test.h>

#include <kj/debug.h>
#include <kj/test.h>

namespace workerd {
namespace {

static constexpr kj::Date oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH;
static constexpr kj::Date twoMs = 2 * kj::MILLISECONDS + kj::UNIX_EPOCH;
static constexpr kj::Date threeMs = 3 * kj::MILLISECONDS + kj::UNIX_EPOCH;
static constexpr kj::Date fourMs = 4 * kj::MILLISECONDS + kj::UNIX_EPOCH;
static constexpr kj::Date fiveMs = 5 * kj::MILLISECONDS + kj::UNIX_EPOCH;

template <typename T>
kj::Promise<T> eagerlyReportExceptions(kj::Promise<T> promise, kj::SourceLocation location = {}) {
  return promise.eagerlyEvaluate([location](kj::Exception&& e) -> T {
    KJ_LOG_AT(ERROR, location, e);
    kj::throwFatalException(kj::mv(e));
  });
}

// Expect that a synchronous result is returned.
template <typename T>
T expectSync(kj::OneOf<T, kj::Promise<T>> result, kj::SourceLocation location = {}) {
  KJ_SWITCH_ONEOF(result) {
    KJ_CASE_ONEOF(promise, kj::Promise<T>) {
      KJ_FAIL_ASSERT_AT(location, "result was unexpectedly asynchronous");
    }
    KJ_CASE_ONEOF(value, T) {
      return kj::mv(value);
    }
  }
  KJ_UNREACHABLE;
}

struct ActorSqliteTestOptions final {
  bool monitorOutputGate = true;
};

struct ActorSqliteTest final {
  kj::EventLoop loop;
  kj::WaitScope ws;

  OutputGate gate;
  kj::Own<const kj::Directory> vfsDir;
  SqliteDatabase::Vfs vfs;
  SqliteDatabase db;

  struct Call final {
    kj::String desc;
    kj::Own<kj::PromiseFulfiller<void>> fulfiller;
  };
  kj::Vector<Call> calls;

  struct ActorSqliteTestHooks final: public ActorSqlite::Hooks {
   public:
    explicit ActorSqliteTestHooks(ActorSqliteTest& parent): parent(parent) {}

    kj::Promise<void> scheduleRun(
        kj::Maybe<kj::Date> newAlarmTime, kj::Promise<void> priorTask) override {
      KJ_IF_SOME(h, parent.scheduleRunHandler) {
        return h(newAlarmTime);
      }
      auto desc = newAlarmTime.map([](auto& t) {
        return kj::str("scheduleRun(", t, ")");
      }).orDefault(kj::str("scheduleRun(none)"));
      auto [promise, fulfiller] = kj::newPromiseAndFulfiller<void>();
      parent.calls.add(Call{kj::mv(desc), kj::mv(fulfiller)});
      return kj::mv(promise);
    }

    ActorSqliteTest& parent;
  };
  kj::Maybe<kj::Function<kj::Promise<void>(kj::Maybe<kj::Date>)>> scheduleRunHandler;
  ActorSqliteTestHooks hooks = ActorSqliteTestHooks(*this);

  ActorSqlite actor;

  kj::Promise<void> gateBrokenPromise;
  kj::UnwindDetector unwindDetector;

  explicit ActorSqliteTest(ActorSqliteTestOptions options = {})
      : ws(loop),
        vfsDir(kj::newInMemoryDirectory(kj::nullClock())),
        vfs(*vfsDir),
        db(vfs, kj::Path({"foo"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY),
        actor(kj::attachRef(db), gate, KJ_BIND_METHOD(*this, commitCallback), hooks),
        gateBrokenPromise(options.monitorOutputGate ? eagerlyReportExceptions(gate.onBroken())
                                                    : kj::Promise<void>(kj::READY_NOW)) {
    db.afterReset([this](SqliteDatabase& cbDb) {
      KJ_ASSERT(&db == &cbDb);
      KJ_ASSERT(actor.isCommitScheduled(),
          "actor should have a commit scheduled during afterReset() callback");
    });
  }

  ~ActorSqliteTest() noexcept(false) {
    if (!unwindDetector.isUnwinding()) {
      // Make sure if the output gate has been broken, the exception was reported. This is
      // important to report errors thrown inside flush(), since those won't otherwise propagate
      // into the test body.
      gateBrokenPromise.poll(ws);

      // Make sure there's no outstanding async work we haven't considered:
      pollAndExpectCalls({}, "unexpected calls at end of test");
    }
  }

  kj::Promise<void> commitCallback() {
    auto [promise, fulfiller] = kj::newPromiseAndFulfiller<void>();
    calls.add(Call{kj::str("commit"), kj::mv(fulfiller)});
    return kj::mv(promise);
  }

  // Polls the event loop, then asserts that the description of calls up to this point match the
  // expectation and returns their fulfillers.  Also clears the call log.
  //
  // TODO(cleanup): Is there a better way to do mocks?  capnp-mock looks nice, but seems a bit
  // heavyweight for this test.
  kj::Vector<kj::Own<kj::PromiseFulfiller<void>>> pollAndExpectCalls(
      std::initializer_list<kj::StringPtr> expCallDescs,
      kj::StringPtr message = ""_kj,
      kj::SourceLocation location = {}) {
    ws.poll();
    auto callDescs = KJ_MAP(c, calls) { return kj::str(c.desc); };
    KJ_ASSERT_AT(callDescs == heapArray(expCallDescs), location, kj::str(message));
    auto fulfillers = KJ_MAP(c, calls) { return kj::mv(c.fulfiller); };
    calls.clear();
    return kj::mv(fulfillers);
  }

  // A few driver methods for convenience.
  auto get(kj::StringPtr key, ActorCache::ReadOptions options = {}) {
    return actor.get(kj::str(key), options);
  }
  auto getAlarm(ActorCache::ReadOptions options = {}) {
    return actor.getAlarm(options);
  }
  auto put(kj::StringPtr key, kj::StringPtr value, ActorCache::WriteOptions options = {}) {
    return actor.put(kj::str(key), kj::heapArray(value.asBytes()), options);
  }
  auto putMultiple(
      kj::Array<ActorCache::KeyValuePair> pairs, ActorCache::WriteOptions options = {}) {
    return actor.put(kj::mv(pairs), options);
  }
  auto putMultipleExplicitTxn(
      kj::Array<ActorCache::KeyValuePair> pairs, ActorCache::WriteOptions options = {}) {
    auto txn = actor.startTransaction();
    txn->put(kj::mv(pairs), options);
    return txn->commit();
  }
  auto deleteMultiple(kj::Array<kj::String> keys, ActorCache::WriteOptions options = {}) {
    return actor.delete_(kj::mv(keys), options);
  }
  auto setAlarm(kj::Maybe<kj::Date> newTime, ActorCache::WriteOptions options = {}) {
    return actor.setAlarm(newTime, options);
  }
  auto sync() {
    return actor.onNoPendingFlush();
  }
};

KJ_TEST("initial alarm value is unset") {
  ActorSqliteTest test;

  KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
}

KJ_TEST("can set and get alarm") {
  ActorSqliteTest test;

  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("check put multiple wraps operations in a transaction") {
  ActorSqliteTest test;

  // Let's deinit the autogate. This will enforce the old behavior where putMultiple would commit
  // some puts, until a single put failed.
  util::Autogate::deinitAutogate();

  kj::Vector<ActorCache::KeyValuePair> putKVs;
  putKVs.add(ActorCache::KeyValuePair{kj::str("foo"), kj::heapArray(kj::str("bar").asBytes())});

  // NoTxn test
  {
    // Check that we're in a NoTxn
    KJ_ASSERT(!test.actor.isCommitScheduled());
    test.putMultiple(putKVs.releaseAsArray());
    // During write, all NoTxn operations are wrapped in an ImplicitTxn.
    auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
    commitFulfiller->fulfill();
    KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
  }

  // ExplicitTxn test
  {
    putKVs.add(ActorCache::KeyValuePair{kj::str("foo2"), kj::heapArray(kj::str("bar2").asBytes())});
    KJ_ASSERT(!test.actor.isCommitScheduled());
    // Similar to the previous putMultiple, but wrapped in a transactionSync (ExplicitTxn)
    test.putMultipleExplicitTxn(putKVs.releaseAsArray());
    auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
    commitFulfiller->fulfill();
    KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo2"))) == kj::str("bar2").asBytes());
  }

  // ImplicitTxn test
  {
    // A single put will create an ImplicitTxn that we can use to wrap our putMultiple into.
    KJ_ASSERT(!test.actor.isCommitScheduled());
    test.put("baz", "bat");

    // By now, we should check there's a commit scheduled in a ImplicitTxn.
    KJ_ASSERT(test.actor.isCommitScheduled());
    putKVs.add(ActorCache::KeyValuePair{kj::str("foo3"), kj::heapArray(kj::str("bar3").asBytes())});
    test.putMultiple(putKVs.releaseAsArray());

    auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
    KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("baz"))) == kj::str("bat").asBytes());
    KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo3"))) == kj::str("bar3").asBytes());
    commitFulfiller->fulfill();
  }
}

KJ_TEST("check put multiple wraps operations in a transaction") {
  ActorSqliteTest test;

  kj::Vector<ActorCache::KeyValuePair> putKVs;
  putKVs.add(ActorCache::KeyValuePair{kj::str("foo"), kj::heapArray(kj::str("bar").asBytes())});

  // NoTxn test
  {
    // Check that we're in a NoTxn
    KJ_ASSERT(!test.actor.isCommitScheduled());
    test.putMultiple(putKVs.releaseAsArray());
    // During write, all NoTxn operations are wrapped in an ImplicitTxn.
    auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
    commitFulfiller->fulfill();
    KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
  }

  // ExplicitTxn test
  {
    putKVs.add(ActorCache::KeyValuePair{kj::str("foo2"), kj::heapArray(kj::str("bar2").asBytes())});
    KJ_ASSERT(!test.actor.isCommitScheduled());
    // Similar to the previous putMultiple, but wrapped in a transactionSync (ExplicitTxn)
    test.putMultipleExplicitTxn(putKVs.releaseAsArray());
    auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
    commitFulfiller->fulfill();
    KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo2"))) == kj::str("bar2").asBytes());
  }

  // ImplicitTxn test
  {
    // A single put will create an ImplicitTxn that we can use to wrap our putMultiple into.
    KJ_ASSERT(!test.actor.isCommitScheduled());
    test.put("baz", "bat");

    // By now, we should check there's a commit scheduled in a ImplicitTxn.
    KJ_ASSERT(test.actor.isCommitScheduled());
    putKVs.add(ActorCache::KeyValuePair{kj::str("foo3"), kj::heapArray(kj::str("bar3").asBytes())});
    test.putMultiple(putKVs.releaseAsArray());

    auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
    KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("baz"))) == kj::str("bat").asBytes());
    KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo3"))) == kj::str("bar3").asBytes());
    commitFulfiller->fulfill();
  }
}

KJ_TEST("check put multiple wraps operations in a transaction and rollback on error") {
  ActorSqliteTest test;

  // We expect that putMultiple is all or nothing, rolling back if a single put fails.

  kj::Vector<ActorCache::KeyValuePair> putKVs;

  // Add some regular key-value pairs that we know are supported
  putKVs.add(ActorCache::KeyValuePair{kj::str("foo"), kj::heapArray(kj::str("bar").asBytes())});
  putKVs.add(ActorCache::KeyValuePair{kj::str("foo2"), kj::heapArray(kj::str("bar2").asBytes())});
  putKVs.add(ActorCache::KeyValuePair{kj::str("foo3"), kj::heapArray(kj::str("bar3").asBytes())});

  // Now create a key that's too large. Should fail with  string or blob too big: SQLITE_TOOBIG
  auto tooLongKey = kj::heapString(2200000);
  tooLongKey.asArray().fill('a');
  // Add it to our KV array
  putKVs.add(
      ActorCache::KeyValuePair{kj::str(tooLongKey), kj::heapArray(kj::str("bar").asBytes())});

  // NoTxn test
  {
    // Check that we're in a NoTxn
    KJ_ASSERT(!test.actor.isCommitScheduled());
    try {
      test.putMultiple(putKVs.releaseAsArray());
      // During write, all NoTxn operations are wrapped in an ImplicitTxn.
      auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
      commitFulfiller->fulfill();
      KJ_UNREACHABLE;
    } catch (kj::Exception& e) {
      KJ_ASSERT(
          e.getDescription() == "expected false; jsg.Error: string or blob too big: SQLITE_TOOBIG");
    }
    KJ_ASSERT(expectSync(test.get(kj::str("foo"))) == nullptr);
    KJ_ASSERT(expectSync(test.get(kj::str("foo2"))) == nullptr);
    KJ_ASSERT(expectSync(test.get(kj::str("foo3"))) == nullptr);
  }

  // Reset the transaction state by going async, which will cause the ImplicitTxn to commit.
  {
    auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
    commitFulfiller->fulfill();
  }

  // ExplicitTxn test
  {
    KJ_ASSERT(!test.actor.isCommitScheduled());
    // Similar to the previous putMultiple, but wrapped in a transactionSync (ExplicitTxn)
    test.putMultipleExplicitTxn(putKVs.releaseAsArray());
    auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
    commitFulfiller->fulfill();
    KJ_ASSERT(expectSync(test.get(kj::str("foo"))) == nullptr);
    KJ_ASSERT(expectSync(test.get(kj::str("foo2"))) == nullptr);
    KJ_ASSERT(expectSync(test.get(kj::str("foo3"))) == nullptr);
  }

  // ImplicitTxn test
  {
    // A single put will create an ImplicitTxn that we can use to wrap our putMultiple into.
    KJ_ASSERT(!test.actor.isCommitScheduled());
    test.put("baz", "bat");

    // By now, we should check there's a commit scheduled in a ImplicitTxn.
    KJ_ASSERT(test.actor.isCommitScheduled());
    test.putMultiple(putKVs.releaseAsArray());

    auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
    // The single put succeeded, but the putMultiple did not.
    KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("baz"))) == kj::str("bat").asBytes());
    KJ_ASSERT(expectSync(test.get(kj::str("foo"))) == nullptr);
    KJ_ASSERT(expectSync(test.get(kj::str("foo2"))) == nullptr);
    KJ_ASSERT(expectSync(test.get(kj::str("foo3"))) == nullptr);
    commitFulfiller->fulfill();
  }
}

KJ_TEST("alarm write happens transactionally with storage ops") {
  ActorSqliteTest test;

  test.setAlarm(oneMs);
  test.put("foo", "bar");
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
}

KJ_TEST("storage op without alarm change does not wait on scheduler") {
  ActorSqliteTest test;

  test.put("foo", "bar");
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
  KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
}

KJ_TEST("alarm scheduling starts synchronously before implicit local db commit") {
  ActorSqliteTest test;

  // In workerd (unlike edgeworker), there is no remote storage, so there is no work done in
  // commitCallback(); the local db is considered durably stored after the synchronous sqlite
  // commit() call returns.  If a commit includes an alarm state change that requires scheduling
  // before the commit call, it needs to happen synchronously.  Since workerd synchronously
  // schedules alarms, we just need to ensure that the database is in a pre-commit state when
  // scheduleRun() is called.

  // Initialize alarm state to 2ms.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});

  bool startedScheduleRun = false;
  test.scheduleRunHandler = [&](kj::Maybe<kj::Date>) -> kj::Promise<void> {
    startedScheduleRun = true;

    KJ_EXPECT_THROW_MESSAGE(
        "cannot start a transaction within a transaction", test.db.run("BEGIN TRANSACTION"));

    return kj::READY_NOW;
  };

  test.setAlarm(oneMs);
  KJ_ASSERT(!startedScheduleRun);
  test.ws.poll();
  KJ_ASSERT(startedScheduleRun);

  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("alarm scheduling starts synchronously before explicit local db commit") {
  ActorSqliteTest test;

  // Initialize alarm state to 2ms.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});

  bool startedScheduleRun = false;
  test.scheduleRunHandler = [&](kj::Maybe<kj::Date>) -> kj::Promise<void> {
    startedScheduleRun = true;

    // Not sure if there is a good way to detect savepoint presence without mutating the db state,
    // but this is sufficient to verify the test properties:

    // Verify that we are not within a nested savepoint.
    KJ_EXPECT_THROW_MESSAGE(
        "no such savepoint: _cf_savepoint_1", test.db.run("RELEASE _cf_savepoint_1"));

    // Verify that we are within the root savepoint.
    test.db.run("RELEASE _cf_savepoint_0");
    KJ_EXPECT_THROW_MESSAGE(
        "no such savepoint: _cf_savepoint_0", test.db.run("RELEASE _cf_savepoint_0"));

    // We don't actually care what happens in the test after this point, but it's slightly simpler
    // to re-add the savepoint to allow the test to complete cleanly:
    test.db.run("SAVEPOINT _cf_savepoint_0");

    return kj::READY_NOW;
  };

  {
    auto txn = test.actor.startTransaction();
    txn->setAlarm(oneMs, {});

    KJ_ASSERT(!startedScheduleRun);
    txn->commit();
    KJ_ASSERT(startedScheduleRun);

    test.pollAndExpectCalls({"commit"})[0]->fulfill();
  }

  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("alarm scheduling does not start synchronously before nested explicit local db commit") {
  ActorSqliteTest test;

  // Initialize alarm state to 2ms.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});

  bool startedScheduleRun = false;
  test.scheduleRunHandler = [&](kj::Maybe<kj::Date>) -> kj::Promise<void> {
    startedScheduleRun = true;
    return kj::READY_NOW;
  };

  {
    auto txn1 = test.actor.startTransaction();

    {
      auto txn2 = test.actor.startTransaction();
      txn2->setAlarm(oneMs, {});

      txn2->commit();
      KJ_ASSERT(!startedScheduleRun);
    }

    txn1->commit();
    KJ_ASSERT(startedScheduleRun);

    test.pollAndExpectCalls({"commit"})[0]->fulfill();
  }

  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("synchronous alarm scheduling failure causes local db commit to throw synchronously") {
  ActorSqliteTest test({.monitorOutputGate = false});
  auto promise = test.gate.onBroken();

  auto getLocalAlarm = [&]() -> kj::Maybe<kj::Date> {
    auto query = test.db.run("SELECT value FROM _cf_METADATA WHERE key = 1");
    if (query.isDone() || query.isNull(0)) {
      return kj::none;
    } else {
      return kj::UNIX_EPOCH + query.getInt64(0) * kj::NANOSECONDS;
    }
  };

  // Initialize alarm state to 2ms.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});

  // Override scheduleRun handler with one that throws synchronously.
  bool startedScheduleRun = false;
  test.scheduleRunHandler = [&](kj::Maybe<kj::Date>) -> kj::Promise<void> {
    startedScheduleRun = true;
    // Must throw synchronously; returning an exception is insufficient.
    kj::throwFatalException(KJ_EXCEPTION(FAILED, "a_sync_fail"));
  };

  KJ_ASSERT(!promise.poll(test.ws));
  test.setAlarm(oneMs);

  // Expect that polling will attempt to commit the implicit transaction, which should
  // synchronously fail when attempting to call scheduleRun() before the db commit, and roll back the
  // local db state to the 2ms alarm.
  KJ_ASSERT(!startedScheduleRun);
  KJ_ASSERT(KJ_REQUIRE_NONNULL(getLocalAlarm()) == oneMs);
  test.ws.poll();
  KJ_ASSERT(startedScheduleRun);
  KJ_ASSERT(KJ_REQUIRE_NONNULL(getLocalAlarm()) == twoMs);

  KJ_ASSERT(promise.poll(test.ws));
  KJ_EXPECT_THROW_MESSAGE("a_sync_fail", promise.wait(test.ws));
}

KJ_TEST("can clear alarm") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  test.setAlarm(kj::none);
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill();

  KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
}

KJ_TEST("can set alarm twice") {
  ActorSqliteTest test;

  test.setAlarm(oneMs);
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
}

KJ_TEST("setting duplicate alarm is no-op") {
  ActorSqliteTest test;

  test.setAlarm(kj::none);
  test.pollAndExpectCalls({});

  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  test.setAlarm(oneMs);
  test.pollAndExpectCalls({});
}

KJ_TEST("tells alarm handler to cancel when committed alarm is empty") {
  ActorSqliteTest test;

  {
    auto armResult = test.actor.armAlarmHandler(oneMs, false);
    // We expect armAlarmHandler() to tell us to cancel the alarm.
    KJ_ASSERT(armResult.is<ActorCache::CancelAlarmHandler>());
    auto waitPromise = kj::mv(armResult.get<ActorCache::CancelAlarmHandler>().waitBeforeCancel);

    // We also expect the alarm cancellation to contain a scheduling request to delete the alarm,
    // to handle cases where alarm deletion was durably committed to the database, but a failure
    // occurred before the alarm deletion was conveyed to the alarm scheduler.
    KJ_ASSERT(!waitPromise.poll(test.ws));
    test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill();
    KJ_ASSERT(waitPromise.poll(test.ws));
    waitPromise.wait(test.ws);
  }
}

KJ_TEST("tells alarm handler to reschedule when handler alarm is later than committed alarm") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  // Request handler run at 2ms.  Expect cancellation with rescheduling.
  auto armResult = test.actor.armAlarmHandler(twoMs, false);
  KJ_ASSERT(armResult.is<ActorSqlite::CancelAlarmHandler>());
  auto cancelResult = kj::mv(armResult.get<ActorSqlite::CancelAlarmHandler>());

  // Expect rescheduling was requested and that returned promise resolves after fulfillment.
  auto waitBeforeCancel = kj::mv(cancelResult.waitBeforeCancel);
  auto rescheduleFulfiller = kj::mv(test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]);
  KJ_ASSERT(!waitBeforeCancel.poll(test.ws));
  rescheduleFulfiller->fulfill();
  KJ_ASSERT(waitBeforeCancel.poll(test.ws));
  waitBeforeCancel.wait(test.ws);
}

KJ_TEST("tells alarm handler to reschedule when handler alarm is earlier than committed alarm") {
  ActorSqliteTest test;

  // Initialize alarm state to 2ms.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

  // Expect that armAlarmHandler() tells caller to cancel after rescheduling completes.
  auto armResult = test.actor.armAlarmHandler(oneMs, false);
  KJ_ASSERT(armResult.is<ActorSqlite::CancelAlarmHandler>());
  auto cancelResult = kj::mv(armResult.get<ActorSqlite::CancelAlarmHandler>());

  // Expect rescheduling was requested and that returned promise resolves after fulfillment.
  auto waitBeforeCancel = kj::mv(cancelResult.waitBeforeCancel);
  auto rescheduleFulfiller = kj::mv(test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]);
  KJ_ASSERT(!waitBeforeCancel.poll(test.ws));
  rescheduleFulfiller->fulfill();
  KJ_ASSERT(waitBeforeCancel.poll(test.ws));
  waitBeforeCancel.wait(test.ws);
}

KJ_TEST("does not cancel handler when local db alarm state is later than scheduled alarm") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  test.setAlarm(twoMs);
  {
    auto armResult = test.actor.armAlarmHandler(oneMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
  }
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
}

KJ_TEST("does not cancel handler when local db alarm state is earlier than scheduled alarm") {
  ActorSqliteTest test;

  // Initialize alarm state to 2ms.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

  test.setAlarm(oneMs);
  {
    auto armResult = test.actor.armAlarmHandler(twoMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
  }
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
}

KJ_TEST("getAlarm() returns null during handler") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  {
    auto armResult = test.actor.armAlarmHandler(oneMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
    test.pollAndExpectCalls({});

    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
  }
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill();
}

KJ_TEST("alarm handler handle clears alarm when dropped with no writes") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  {
    auto armResult = test.actor.armAlarmHandler(oneMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
  }
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill();
  KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
}

KJ_TEST("alarm deleter does not clear alarm when dropped with writes") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  {
    auto armResult = test.actor.armAlarmHandler(oneMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
    test.setAlarm(twoMs);
  }
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();

  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
}

KJ_TEST("can cancel deferred alarm deletion during handler") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  {
    auto armResult = test.actor.armAlarmHandler(oneMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
    test.actor.cancelDeferredAlarmDeletion();
  }

  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("canceling deferred alarm deletion outside handler has no effect") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  {
    auto armResult = test.actor.armAlarmHandler(oneMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
  }
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill();

  test.actor.cancelDeferredAlarmDeletion();

  KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
}

KJ_TEST("canceling deferred alarm deletion outside handler edge case") {
  // Presumably harmless to cancel deletion if the client requests it after the handler ends but
  // before the event loop runs the commit code?  Trying to cancel deletion outside the handler is
  // a bit of a contract violation anyway -- maybe we should just assert against it?
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  {
    auto armResult = test.actor.armAlarmHandler(oneMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
  }
  test.actor.cancelDeferredAlarmDeletion();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill();

  KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
}

KJ_TEST("canceling deferred alarm deletion is idempotent") {
  // Not sure if important, but matches ActorCache behavior.
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  {
    auto armResult = test.actor.armAlarmHandler(oneMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
    test.actor.cancelDeferredAlarmDeletion();
    test.actor.cancelDeferredAlarmDeletion();
  }

  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("alarm handler cleanup succeeds when output gate is broken") {
  auto runWithSetup = [](auto testFunc) {
    ActorSqliteTest test({.monitorOutputGate = false});
    auto promise = test.gate.onBroken();

    // Initialize alarm state to 1ms.
    test.setAlarm(oneMs);
    test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
    test.pollAndExpectCalls({"commit"})[0]->fulfill();
    test.pollAndExpectCalls({});
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

    auto armResult = test.actor.armAlarmHandler(oneMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
    auto deferredDelete = kj::mv(armResult.get<ActorSqlite::RunAlarmHandler>().deferredDelete);

    // Break gate
    test.put("foo", "bar");
    test.pollAndExpectCalls({"commit"})[0]->reject(KJ_EXCEPTION(FAILED, "a_rejected_commit"));
    KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", promise.wait(test.ws));
    // Ensure taskFailed handler runs and notices brokenness:
    test.ws.poll();

    testFunc(test, kj::mv(deferredDelete));
  };

  // Here, we test that the deferred deleter destructor doesn't throw, both in the case when the
  // caller cancels deletion and when it does not cancel it:

  runWithSetup([](ActorSqliteTest& test, kj::Own<void> deferredDelete) {
    // In the case where the handler fails, we assume the caller will explicitly cancel deferred
    // alarm deletion:
    test.actor.cancelDeferredAlarmDeletion();

    // Dropping the DeferredAlarmDeleter should succeed -- that is, it should not throw here:
    { auto drop = kj::mv(deferredDelete); }
  });

  runWithSetup([](ActorSqliteTest& test, kj::Own<void> deferredDelete) {
    // In the case where the handler succeeds, the caller will not cancel deferred deletion before
    // dropping the DeferredAlarmDeleter.  Dropping the DeferredAlarmDeleter should still succeed,
    // even if the output gate happens to already be broken:
    { auto drop = kj::mv(deferredDelete); }
  });
}

KJ_TEST("handler alarm is not deleted when commit fails") {
  ActorSqliteTest test({.monitorOutputGate = false});

  auto promise = test.gate.onBroken();

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  {
    auto armResult = test.actor.armAlarmHandler(oneMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());

    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
  }
  test.pollAndExpectCalls({"commit"})[0]->reject(KJ_EXCEPTION(FAILED, "a_rejected_commit"));

  KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", promise.wait(test.ws));
}

KJ_TEST("setting earlier alarm persists alarm scheduling before db") {
  ActorSqliteTest test;

  // Initialize alarm state to 2ms.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

  // Update alarm to be earlier.  We expect the alarm scheduling to be persisted before the db.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("setting later alarm persists db before alarm scheduling") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  // Update alarm to be later.  We expect the db to be persisted before the alarm scheduling.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();

  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
}

KJ_TEST("multiple set-earlier in-flight alarms wait for earliest before committing db") {
  ActorSqliteTest test;

  // Initialize alarm state to 5ms.
  test.setAlarm(fiveMs);
  test.pollAndExpectCalls({"scheduleRun(5ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == fiveMs);

  // Gate is not blocked.
  auto gateWaitBefore = test.gate.wait();
  KJ_ASSERT(gateWaitBefore.poll(test.ws));

  // Update alarm to be earlier (4ms).  We expect the alarm scheduling to start.
  test.setAlarm(fourMs);
  auto fulfiller4Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(4ms)"})[0]);
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == fourMs);

  // Gate as-of 4ms update is blocked.
  auto gateWait4ms = test.gate.wait();
  KJ_ASSERT(!gateWait4ms.poll(test.ws));

  // While 4ms scheduling request is in-flight, update alarm to be even earlier (3ms).  We expect
  // the 4ms request to block the 3ms scheduling request.
  test.setAlarm(threeMs);
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == threeMs);

  // Gate as-of 3ms update is blocked.
  auto gateWait3ms = test.gate.wait();
  KJ_ASSERT(!gateWait3ms.poll(test.ws));

  // Update alarm to be even earlier (2ms).  We expect scheduling requests to still be blocked.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

  // Gate as-of 2ms update is blocked.
  auto gateWait2ms = test.gate.wait();
  KJ_ASSERT(!gateWait2ms.poll(test.ws));

  // Fulfill the 4ms request.  We expect the 2ms scheduling to start, because that is the current
  // alarm value.
  fulfiller4Ms->fulfill();
  auto fulfiller2Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]);
  test.pollAndExpectCalls({});

  // While waiting for 2ms request, update alarm time to be 1ms.  Expect scheduling to be blocked.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  // Gate as-of 1ms update is blocked.
  auto gateWait1ms = test.gate.wait();
  KJ_ASSERT(!gateWait1ms.poll(test.ws));

  // Fulfill the 2ms request.  We expect the 1ms scheduling to start.
  fulfiller2Ms->fulfill();
  auto fulfiller1Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]);
  test.pollAndExpectCalls({});

  // Fulfill the 1ms request.  We expect a single db commit to start (coalescing all previous db
  // commits together).
  fulfiller1Ms->fulfill();
  auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
  test.pollAndExpectCalls({});

  // We expect all earlier gates to be blocked until commit completes.
  KJ_ASSERT(!gateWait4ms.poll(test.ws));
  KJ_ASSERT(!gateWait3ms.poll(test.ws));
  KJ_ASSERT(!gateWait2ms.poll(test.ws));
  KJ_ASSERT(!gateWait1ms.poll(test.ws));
  commitFulfiller->fulfill();
  KJ_ASSERT(gateWait4ms.poll(test.ws));
  KJ_ASSERT(gateWait3ms.poll(test.ws));
  KJ_ASSERT(gateWait2ms.poll(test.ws));
  KJ_ASSERT(gateWait1ms.poll(test.ws));

  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("setting later alarm times does scheduling after db commit") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  // Gate is not blocked.
  auto gateWaitBefore = test.gate.wait();
  KJ_ASSERT(gateWaitBefore.poll(test.ws));

  // Set alarm to 2ms.  Expect 2ms db commit to start.
  test.setAlarm(twoMs);
  auto commit2MsFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
  test.pollAndExpectCalls({});

  // Gate as-of 2ms update is blocked.
  auto gateWait2Ms = test.gate.wait();
  KJ_ASSERT(!gateWait2Ms.poll(test.ws));

  // Set alarm to 3ms.  Expect 3ms db commit to start. The 2ms scheduleRun will never happen now
  // that we've overwritten it while it was persisting to SQLite (before we send an update to the
  // alarm manager).
  test.setAlarm(threeMs);
  auto commit3MsFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
  test.pollAndExpectCalls({});

  // Gate as-of 3ms update is blocked.
  auto gateWait3Ms = test.gate.wait();
  KJ_ASSERT(!gateWait3Ms.poll(test.ws));

  // Expect 2ms gate to be unblocked once the commit finishes, but don't expect a scheduleRun(2ms).
  KJ_ASSERT(!gateWait2Ms.poll(test.ws));
  commit2MsFulfiller->fulfill();
  KJ_ASSERT(gateWait2Ms.poll(test.ws));
  test.pollAndExpectCalls({});

  // Fulfill 3ms db commit.  Expect 3ms alarm to be scheduled and 3ms gate to be unblocked.
  KJ_ASSERT(!gateWait3Ms.poll(test.ws));

  commit3MsFulfiller->fulfill();
  KJ_ASSERT(gateWait3Ms.poll(test.ws));

  auto fulfiller3Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(3ms)"})[0]);
  test.pollAndExpectCalls({});

  fulfiller3Ms->fulfill();
}

KJ_TEST("rejected move-earlier alarm scheduling request breaks gate") {
  ActorSqliteTest test({.monitorOutputGate = false});

  auto promise = test.gate.onBroken();

  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->reject(
      KJ_EXCEPTION(FAILED, "a_rejected_scheduleRun"));

  KJ_EXPECT_THROW_MESSAGE("a_rejected_scheduleRun", promise.wait(test.ws));
}

KJ_TEST("rejected move-later alarm scheduling request does not break gate") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  // Update alarm to be later.  We expect the db to be persisted before the alarm scheduling.
  // We simulate a failure during the alarm rescheduling, but expect it to not break the output
  // gate.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->reject(
      KJ_EXCEPTION(FAILED, "a_rejected_scheduleRun"));

  // Subsequent kv put succeeds.  In an earlier version of the code, this failed, due to capturing
  // the scheduling failure as if it had broke the output gate, without actually breaking the
  // output gate.
  test.put("foo", "bar");
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
}

KJ_TEST("an exception thrown during merged commits does not hang") {
  ActorSqliteTest test({.monitorOutputGate = false});

  auto promise = test.gate.onBroken();

  // Initialize alarm state to 5ms.
  test.setAlarm(fiveMs);
  test.pollAndExpectCalls({"scheduleRun(5ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == fiveMs);

  // Update alarm to be earlier (4ms).  We expect the alarm scheduling to start.
  test.setAlarm(fourMs);
  auto fulfiller4Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(4ms)"})[0]);
  auto gateWait4ms = test.gate.wait();

  // While 4ms scheduling request is in-flight, update alarm to be earlier (3ms).  We expect
  // the two commit requests to merge and be blocked on the alarm scheduling request.
  test.setAlarm(threeMs);
  test.pollAndExpectCalls({});
  auto gateWait3ms = test.gate.wait();

  // Reject the 4ms request.  We expect both gate waiting promises to unblock with exceptions.
  KJ_ASSERT(!gateWait4ms.poll(test.ws));
  KJ_ASSERT(!gateWait3ms.poll(test.ws));
  fulfiller4Ms->reject(KJ_EXCEPTION(FAILED, "a_rejected_scheduleRun"));
  KJ_ASSERT(gateWait4ms.poll(test.ws));
  KJ_ASSERT(gateWait3ms.poll(test.ws));

  KJ_EXPECT_THROW_MESSAGE("a_rejected_scheduleRun", gateWait4ms.wait(test.ws));
  KJ_EXPECT_THROW_MESSAGE("a_rejected_scheduleRun", gateWait3ms.wait(test.ws));
  KJ_EXPECT_THROW_MESSAGE("a_rejected_scheduleRun", promise.wait(test.ws));
}

KJ_TEST("getAlarm/setAlarm check for brokenness") {
  ActorSqliteTest test({.monitorOutputGate = false});

  auto promise = test.gate.onBroken();

  // Break gate
  test.put("foo", "bar");
  test.pollAndExpectCalls({"commit"})[0]->reject(KJ_EXCEPTION(FAILED, "a_rejected_commit"));

  KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", promise.wait(test.ws));

  // Apparently we don't actually set brokenness until the taskFailed handler runs, but presumably
  // this is OK?
  test.getAlarm();

  // Ensure taskFailed handler runs and notices brokenness:
  test.ws.poll();

  KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", test.getAlarm());
  KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", test.setAlarm(kj::none));
  test.pollAndExpectCalls({});
}

KJ_TEST("calling deleteAll() preserves alarm state if alarm is set") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  {
    KJ_ASSERT(!test.actor.isCommitScheduled());
    ActorCache::DeleteAllResults results = test.actor.deleteAll({});
    KJ_ASSERT(test.actor.isCommitScheduled());
    KJ_ASSERT(results.backpressure == kj::none);
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

    auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
    KJ_ASSERT(results.count.wait(test.ws) == 0);
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

    commitFulfiller->fulfill();
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

    test.pollAndExpectCalls({});
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
  }

  {
    // Should be fine to call deleteAll() a few times in succession, too:
    KJ_ASSERT(!test.actor.isCommitScheduled());
    ActorCache::DeleteAllResults results1 = test.actor.deleteAll({});
    ActorCache::DeleteAllResults results2 = test.actor.deleteAll({});
    KJ_ASSERT(test.actor.isCommitScheduled());
    KJ_ASSERT(results1.backpressure == kj::none);
    KJ_ASSERT(results2.backpressure == kj::none);
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

    // Presumably fine to be performing the alarm state restoration after each db reset:
    auto commitFulfillers = test.pollAndExpectCalls({"commit", "commit"});
    KJ_ASSERT(results1.count.wait(test.ws) == 0);
    KJ_ASSERT(results2.count.wait(test.ws) == 0);
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

    commitFulfillers[0]->fulfill();
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
    commitFulfillers[1]->fulfill();
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

    test.pollAndExpectCalls({});
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
  }
}

KJ_TEST("calling deleteAll() preserves alarm state if alarm is not set") {
  ActorSqliteTest test;

  // Initialize alarm state to empty value in metadata table.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
  test.setAlarm(kj::none);
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);

  {
    KJ_ASSERT(!test.actor.isCommitScheduled());
    ActorCache::DeleteAllResults results = test.actor.deleteAll({});
    KJ_ASSERT(test.actor.isCommitScheduled());
    KJ_ASSERT(results.backpressure == kj::none);
    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);

    auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
    KJ_ASSERT(results.count.wait(test.ws) == 0);
    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);

    commitFulfiller->fulfill();
    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);

    // We can also assert that we leave the database empty, in case that turns out to be useful later:
    auto q =
        test.db.run("SELECT name FROM sqlite_master WHERE type='table' AND name='_cf_METADATA'");
    KJ_ASSERT(q.isDone());
  }

  {
    // Should be fine to call deleteAll() a few times in succession, too:
    KJ_ASSERT(!test.actor.isCommitScheduled());
    ActorCache::DeleteAllResults results1 = test.actor.deleteAll({});
    ActorCache::DeleteAllResults results2 = test.actor.deleteAll({});
    KJ_ASSERT(test.actor.isCommitScheduled());
    KJ_ASSERT(results1.backpressure == kj::none);
    KJ_ASSERT(results2.backpressure == kj::none);
    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);

    // Presumably fine that the deletion commits coalesce:
    auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
    KJ_ASSERT(results1.count.wait(test.ws) == 0);
    KJ_ASSERT(results2.count.wait(test.ws) == 0);
    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);

    commitFulfiller->fulfill();
    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);

    auto q =
        test.db.run("SELECT name FROM sqlite_master WHERE type='table' AND name='_cf_METADATA'");
    KJ_ASSERT(q.isDone());
  }
}

KJ_TEST("calling deleteAll() during an implicit transaction preserves alarm state") {
  ActorSqliteTest test;

  KJ_ASSERT(!test.actor.isCommitScheduled());

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);

  ActorCache::DeleteAllResults results = test.actor.deleteAll({});
  KJ_ASSERT(test.actor.isCommitScheduled());
  KJ_ASSERT(results.backpressure == kj::none);
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();

  auto commitFulfiller = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
  KJ_ASSERT(results.count.wait(test.ws) == 0);
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  commitFulfiller->fulfill();
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("rolling back transaction leaves alarm in expected state") {
  ActorSqliteTest test;

  // Initialize alarm state to 2ms.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

  {
    auto txn = test.actor.startTransaction();
    KJ_ASSERT(expectSync(txn->getAlarm({})) == twoMs);
    txn->setAlarm(oneMs, {});
    KJ_ASSERT(expectSync(txn->getAlarm({})) == oneMs);
    // Dropping transaction without committing; should roll back.
  }
  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
}

KJ_TEST("rolling back transaction leaves deferred alarm deletion in expected state") {
  ActorSqliteTest test;

  // Initialize alarm state to 2ms.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

  {
    auto armResult = test.actor.armAlarmHandler(twoMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());

    auto txn = test.actor.startTransaction();
    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
    test.setAlarm(oneMs);
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
    txn->rollback().wait(test.ws);

    // After rollback, getAlarm() still returns the deferred deletion result.
    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);

    // After rollback, no changes committed, no change in scheduled alarm.
    test.pollAndExpectCalls({});
  }

  // After handler, 2ms alarm is deleted.
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill();
  KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
}

KJ_TEST("committing transaction leaves deferred alarm deletion in expected state") {
  ActorSqliteTest test;

  // Initialize alarm state to 2ms.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

  {
    auto armResult = test.actor.armAlarmHandler(twoMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());

    auto txn = test.actor.startTransaction();
    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
    test.setAlarm(oneMs);
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
    txn->commit();

    // After commit, getAlarm() returns the committed value.
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
    test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
    test.pollAndExpectCalls({"commit"})[0]->fulfill();
    test.pollAndExpectCalls({});
  }

  // Alarm not deleted
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("rolling back nested transaction leaves deferred alarm deletion in expected state") {
  ActorSqliteTest test;

  // Initialize alarm state to 2ms.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

  {
    auto armResult = test.actor.armAlarmHandler(twoMs, false);
    KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());

    auto txn1 = test.actor.startTransaction();
    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
    {
      // Rolling back nested transaction change leaves deferred deletion in place.
      auto txn2 = test.actor.startTransaction();
      KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
      test.setAlarm(oneMs);
      KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
      txn2->rollback().wait(test.ws);
      KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
    }
    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
    {
      // Committing nested transaction changes parent transaction state to dirty.
      auto txn3 = test.actor.startTransaction();
      KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
      test.setAlarm(oneMs);
      KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
      txn3->commit();
      KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
    }
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
    {
      // Nested transaction of dirty transaction is dirty, rollback has no effect.
      auto txn4 = test.actor.startTransaction();
      KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
      txn4->rollback().wait(test.ws);
      KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
    }
    KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
    txn1->rollback().wait(test.ws);

    // After root transaction rollback, getAlarm() still returns the deferred deletion result.
    KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);

    // After rollback, no changes committed, no change in scheduled alarm.
    test.pollAndExpectCalls({});
  }

  // After handler, 2ms alarm is deleted.
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill();
  KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
}

KJ_TEST("database write operations check for brokenness") {
  ActorSqliteTest test({.monitorOutputGate = false});

  auto promise = test.gate.onBroken();

  // Break gate
  test.put("foo", "bar");
  test.pollAndExpectCalls({"commit"})[0]->reject(KJ_EXCEPTION(FAILED, "a_rejected_commit"));

  KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", promise.wait(test.ws));

  // We don't actually set ActorSqlite's brokenness until the taskFailed handler runs...
  test.ws.poll();

  // Try making a write operation to the database, expecting it to throw the broken message via
  // the onWrite handler:
  KJ_EXPECT_THROW_MESSAGE(
      "a_rejected_commit", test.db.run("CREATE TABLE IF NOT EXISTS counter (count INTEGER)"));
  test.pollAndExpectCalls({});
}

KJ_TEST("allowUnconfirmed put does not block output gate") {
  ActorSqliteTest test;

  // Gate is currently not blocked.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Do an unconfirmed put
  test.put("foo", "bar", {.allowUnconfirmed = true});

  // Gate still isn't blocked, because we set `allowUnconfirmed`.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Complete the transaction.
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should still not be blocked after commit completes
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Verify data was written
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
}

KJ_TEST("confirmed put blocks output gate") {
  ActorSqliteTest test;

  // Gate is currently not blocked.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Do a confirmed put (default behavior)
  test.put("foo", "bar", {.allowUnconfirmed = false});

  // Now it should be blocked.
  KJ_ASSERT(!test.gate.wait().poll(test.ws));

  // Complete the transaction.
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should unblock after commit completes
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Verify data was written
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
}

KJ_TEST("mixed confirmed and unconfirmed writes in same transaction use output gate") {
  ActorSqliteTest test;

  // Gate is currently not blocked.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Do an unconfirmed put followed by a confirmed put in the same transaction batch
  test.put("foo", "bar", {.allowUnconfirmed = true});
  test.put("baz", "quux", {.allowUnconfirmed = false});

  // Since any write in the batch needs confirmation, the entire batch should use output gate
  KJ_ASSERT(!test.gate.wait().poll(test.ws));

  // Complete the transaction.
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should unblock after commit completes
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Both writes should be committed
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("baz"))) == kj::str("quux").asBytes());
}

KJ_TEST("allowUnconfirmed delete does not block output gate") {
  ActorSqliteTest test;

  // First set up some data
  test.put("foo", "bar");
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should be unblocked after setup
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Perform an unconfirmed delete - need to add delete helper method or use actor directly
  expectSync(test.actor.delete_(kj::str("foo"), {.allowUnconfirmed = true}));

  // Gate still isn't blocked, because we set `allowUnconfirmed`.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Complete the transaction.
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should still not be blocked after commit completes
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Data should be deleted
  KJ_ASSERT(expectSync(test.get("foo")) == kj::none);
}

KJ_TEST("allowUnconfirmed putMultiple does not block output gate") {
  ActorSqliteTest test;

  // Gate should be unblocked at start
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Create multiple key-value pairs for the test
  kj::Vector<ActorCache::KeyValuePair> putKVs;
  putKVs.add(ActorCache::KeyValuePair{kj::str("foo"), kj::heapArray(kj::str("bar").asBytes())});
  putKVs.add(ActorCache::KeyValuePair{kj::str("baz"), kj::heapArray(kj::str("qux").asBytes())});
  putKVs.add(ActorCache::KeyValuePair{kj::str("key3"), kj::heapArray(kj::str("value3").asBytes())});

  // Perform an unconfirmed putMultiple within the implicit transaction
  test.putMultiple(putKVs.releaseAsArray(), {.allowUnconfirmed = true});

  // Gate still isn't blocked, because we set `allowUnconfirmed`.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Complete the transaction.
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should still not be blocked after commit completes
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Verify all data was written correctly
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("baz"))) == kj::str("qux").asBytes());
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("key3"))) == kj::str("value3").asBytes());
}

KJ_TEST("allowUnconfirmed deleteMultiple does not block output gate") {
  ActorSqliteTest test;

  // First set up some data
  kj::Vector<ActorCache::KeyValuePair> putKVs;
  putKVs.add(ActorCache::KeyValuePair{kj::str("foo"), kj::heapArray(kj::str("bar").asBytes())});
  putKVs.add(ActorCache::KeyValuePair{kj::str("baz"), kj::heapArray(kj::str("qux").asBytes())});
  putKVs.add(ActorCache::KeyValuePair{kj::str("key3"), kj::heapArray(kj::str("value3").asBytes())});

  test.putMultiple(putKVs.releaseAsArray());
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should be unblocked after setup
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Create array of keys to delete
  kj::Vector<kj::String> deleteKeys;
  deleteKeys.add(kj::str("foo"));
  deleteKeys.add(kj::str("baz"));
  deleteKeys.add(kj::str("key3"));

  // Perform an unconfirmed deleteMultiple
  KJ_EXPECT(expectSync(
                test.deleteMultiple(deleteKeys.releaseAsArray(), {.allowUnconfirmed = true})) == 3);

  // Gate still isn't blocked, because we set `allowUnconfirmed`.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Complete the transaction.
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should still not be blocked after commit completes
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Verify all data was deleted
  KJ_ASSERT(expectSync(test.get("foo")) == kj::none);
  KJ_ASSERT(expectSync(test.get("baz")) == kj::none);
  KJ_ASSERT(expectSync(test.get("key3")) == kj::none);
}

KJ_TEST("unconfirmed write failure still breaks output gate") {
  ActorSqliteTest test({.monitorOutputGate = false});

  auto promise = test.gate.onBroken();

  // Do an unconfirmed put
  test.put("foo", "bar", {.allowUnconfirmed = true});

  // The output gate is not applied initially.
  KJ_ASSERT(test.gate.wait().poll(test.ws));
  KJ_ASSERT(!promise.poll(test.ws));

  // Reject the commit to simulate failure
  test.pollAndExpectCalls({"commit"})[0]->reject(KJ_EXCEPTION(FAILED, "flush failed hard"));

  // Gate should be broken due to commit failure
  KJ_EXPECT_THROW_MESSAGE("flush failed hard", promise.wait(test.ws));
}

KJ_TEST("Direct SQL queries are confirmed writes") {
  ActorSqliteTest test;

  // Gate is currently not blocked.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  auto& db = KJ_ASSERT_NONNULL(test.actor.getSqliteDatabase());

  db.run("CREATE TABLE myTable (i INTEGER PRIMARY KEY, s TEXT)");
  db.run("INSERT INTO myTable VALUES (1, \"a\")");

  // Now the gate should be blocked.
  KJ_ASSERT(!test.gate.wait().poll(test.ws));

  // Complete the transaction.
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should unblock after commit completes
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Make sure that the write actually succeeded.
  {
    auto query = db.run("SELECT * FROM myTable");
    KJ_ASSERT(!query.isDone());
    KJ_EXPECT(query.getInt64(0) == 1);
    KJ_EXPECT(query.getText(1) == "a");
    query.nextRow();
    KJ_ASSERT(query.isDone());
  }
}

KJ_TEST("An unconfirmed put followed by a direct SQL queries requires the output gate") {
  ActorSqliteTest test;

  // Gate is currently not blocked.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  test.put("foo", "bar", {.allowUnconfirmed = true});
  auto& db = KJ_ASSERT_NONNULL(test.actor.getSqliteDatabase());
  db.run("CREATE TABLE myTable (i INTEGER PRIMARY KEY, s TEXT)");
  db.run("INSERT INTO myTable VALUES (1, \"a\")");

  // Now the gate should be blocked.
  KJ_ASSERT(!test.gate.wait().poll(test.ws));

  // Complete the transaction.
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should unblock after commit completes
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Make sure that the write actually succeeded.
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
  {
    auto query = db.run("SELECT * FROM myTable");
    KJ_ASSERT(!query.isDone());
    KJ_EXPECT(query.getInt64(0) == 1);
    KJ_EXPECT(query.getText(1) == "a");
    query.nextRow();
    KJ_ASSERT(query.isDone());
  }
}

KJ_TEST("sync() returns immediately when no writes are pending") {
  ActorSqliteTest test;

  // When there are no pending writes, sync() should return a resolved promise
  auto syncResult = test.sync();
  auto syncPromise = kj::mv(KJ_ASSERT_NONNULL(syncResult));
  KJ_ASSERT(syncPromise.poll(test.ws));
}

KJ_TEST("sync() waits for confirmed writes to complete") {
  ActorSqliteTest test;

  // Do a confirmed write (default behavior)
  test.put("foo", "bar", {.allowUnconfirmed = false});

  // sync() should return a promise that blocks until the commit completes
  auto syncResult = test.sync();
  KJ_ASSERT(syncResult != kj::none);

  auto syncPromise = kj::mv(KJ_ASSERT_NONNULL(syncResult));

  // The sync promise should not be ready yet
  KJ_ASSERT(!syncPromise.poll(test.ws));

  // Complete the commit
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Now the sync promise should be ready
  KJ_ASSERT(syncPromise.poll(test.ws));
  syncPromise.wait(test.ws);

  // Verify data was written
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
}

KJ_TEST("sync() waits for unconfirmed writes to complete") {
  ActorSqliteTest test;

  // Do an unconfirmed write
  test.put("foo", "bar", {.allowUnconfirmed = true});

  // sync() should still return a promise that blocks until the commit completes
  auto syncResult = test.sync();
  KJ_ASSERT(syncResult != kj::none);

  auto syncPromise = kj::mv(KJ_ASSERT_NONNULL(syncResult));

  // The sync promise should not be ready yet
  KJ_ASSERT(!syncPromise.poll(test.ws));

  // Complete the commit
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Now the sync promise should be ready
  KJ_ASSERT(syncPromise.poll(test.ws));
  syncPromise.wait(test.ws);

  // Verify data was written
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
}

KJ_TEST("sync() waits for multiple unconfirmed writes in a row") {
  ActorSqliteTest test;

  // Do multiple unconfirmed writes - they should batch into a single transaction
  test.put("foo", "bar", {.allowUnconfirmed = true});
  test.put("baz", "qux", {.allowUnconfirmed = true});
  test.put("key3", "value3", {.allowUnconfirmed = true});

  // sync() should wait for the batched commit
  auto syncResult = test.sync();
  KJ_ASSERT(syncResult != kj::none);
  auto syncPromise = kj::mv(KJ_ASSERT_NONNULL(syncResult));

  // The sync promise should not be ready yet
  KJ_ASSERT(!syncPromise.poll(test.ws));

  // Complete the single batched commit
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Now the sync promise should be ready
  KJ_ASSERT(syncPromise.poll(test.ws));
  syncPromise.wait(test.ws);

  // Verify all writes were committed
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("baz"))) == kj::str("qux").asBytes());
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("key3"))) == kj::str("value3").asBytes());
}

KJ_TEST("sync() only waits for writes before it was called") {
  ActorSqliteTest test;

  // First write
  test.put("foo", "bar", {.allowUnconfirmed = true});

  // Call sync for the first write
  auto syncResult = test.sync();
  KJ_ASSERT(syncResult != kj::none);
  auto syncPromise = kj::mv(KJ_ASSERT_NONNULL(syncResult));

  // Complete first commit
  auto firstCommit = kj::mv(test.pollAndExpectCalls({"commit"})[0]);
  firstCommit->fulfill();

  // First sync should complete
  KJ_ASSERT(syncPromise.poll(test.ws));
  syncPromise.wait(test.ws);

  // Second write after sync was called
  test.put("baz", "qux", {.allowUnconfirmed = true});

  // The original sync should still be complete (doesn't wait for new write)
  // To verify this, let's get a new sync that should wait for the second write
  auto syncResult2 = test.sync();
  KJ_ASSERT(syncResult2 != kj::none);
  auto syncPromise2 = kj::mv(KJ_ASSERT_NONNULL(syncResult2));

  // Second sync should not be ready
  KJ_ASSERT(!syncPromise2.poll(test.ws));

  // Complete second commit
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Now second sync should complete
  KJ_ASSERT(syncPromise2.poll(test.ws));
  syncPromise2.wait(test.ws);
}

KJ_TEST("sync() propagates commit errors") {
  ActorSqliteTest test({.monitorOutputGate = false});

  auto promise = test.gate.onBroken();

  // Do an unconfirmed write
  test.put("foo", "bar", {.allowUnconfirmed = true});

  // Call sync
  auto syncResult = test.sync();
  KJ_ASSERT(syncResult != kj::none);
  auto syncPromise = kj::mv(KJ_ASSERT_NONNULL(syncResult));

  // The sync promise should not be ready yet
  KJ_ASSERT(!syncPromise.poll(test.ws));

  // Reject the commit to simulate failure
  test.pollAndExpectCalls({"commit"})[0]->reject(KJ_EXCEPTION(FAILED, "commit failed"));

  // sync promise should become ready with an exception
  KJ_ASSERT(syncPromise.poll(test.ws));
  KJ_EXPECT_THROW_MESSAGE("commit failed", syncPromise.wait(test.ws));

  // Gate should also be broken
  KJ_EXPECT_THROW_MESSAGE("commit failed", promise.wait(test.ws));
}

KJ_TEST("sync() with mixed confirmed and unconfirmed writes") {
  ActorSqliteTest test;

  // Do an unconfirmed write followed by a confirmed write
  test.put("foo", "bar", {.allowUnconfirmed = true});
  test.put("baz", "qux", {.allowUnconfirmed = false});

  // sync() should wait for both writes
  auto syncResult = test.sync();
  KJ_ASSERT(syncResult != kj::none);
  auto syncPromise = kj::mv(KJ_ASSERT_NONNULL(syncResult));

  // The sync promise should not be ready yet
  KJ_ASSERT(!syncPromise.poll(test.ws));

  // Complete the commit (both writes are in the same transaction)
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Now the sync promise should be ready
  KJ_ASSERT(syncPromise.poll(test.ws));
  syncPromise.wait(test.ws);

  // Both writes should be committed
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("baz"))) == kj::str("qux").asBytes());
}

KJ_TEST("multiple sync() calls for same commit") {
  ActorSqliteTest test;

  // Do a write
  test.put("foo", "bar", {.allowUnconfirmed = true});

  // Call sync multiple times - they should all wait for the same commit
  auto syncResult1 = test.sync();
  auto syncResult2 = test.sync();
  auto syncResult3 = test.sync();

  KJ_ASSERT(syncResult1 != kj::none);
  KJ_ASSERT(syncResult2 != kj::none);
  KJ_ASSERT(syncResult3 != kj::none);

  auto syncPromise1 = kj::mv(KJ_ASSERT_NONNULL(syncResult1));
  auto syncPromise2 = kj::mv(KJ_ASSERT_NONNULL(syncResult2));
  auto syncPromise3 = kj::mv(KJ_ASSERT_NONNULL(syncResult3));

  // None should be ready yet
  KJ_ASSERT(!syncPromise1.poll(test.ws));
  KJ_ASSERT(!syncPromise2.poll(test.ws));
  KJ_ASSERT(!syncPromise3.poll(test.ws));

  // Complete the commit
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // All sync promises should become ready
  KJ_ASSERT(syncPromise1.poll(test.ws));
  KJ_ASSERT(syncPromise2.poll(test.ws));
  KJ_ASSERT(syncPromise3.poll(test.ws));

  syncPromise1.wait(test.ws);
  syncPromise2.wait(test.ws);
  syncPromise3.wait(test.ws);
}

KJ_TEST("allowUnconfirmed setAlarm does not block output gate") {
  ActorSqliteTest test;

  // Gate is currently not blocked.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Do an unconfirmed setAlarm
  test.setAlarm(oneMs, {.allowUnconfirmed = true});

  // Gate still isn't blocked, because we set `allowUnconfirmed`.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Complete the transaction - alarm scheduling happens before commit
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should still not be blocked after commit completes
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Verify alarm was set
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("confirmed setAlarm blocks output gate") {
  ActorSqliteTest test;

  // Gate is currently not blocked.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Do a confirmed setAlarm (default behavior)
  test.setAlarm(oneMs, {.allowUnconfirmed = false});

  // Gate should be blocked after scheduling starts
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  KJ_ASSERT(!test.gate.wait().poll(test.ws));

  // Complete the transaction.
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should unblock after commit completes
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Verify alarm was set
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("allowUnconfirmed setAlarm then confirmed put uses output gate") {
  ActorSqliteTest test;

  // Gate is currently not blocked.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Do an unconfirmed setAlarm followed by a confirmed put in the same transaction batch
  test.setAlarm(oneMs, {.allowUnconfirmed = true});
  test.put("foo", "bar", {.allowUnconfirmed = false});

  // Since any write in the batch needs confirmation, the entire batch should use output gate
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  KJ_ASSERT(!test.gate.wait().poll(test.ws));

  // Complete the transaction.
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should unblock after commit completes
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Both operations should be committed
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
}

KJ_TEST("allowUnconfirmed setAlarm with storage ops") {
  ActorSqliteTest test;

  // Gate is currently not blocked.
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Do unconfirmed setAlarm with unconfirmed storage writes
  test.setAlarm(oneMs, {.allowUnconfirmed = true});
  test.put("foo", "bar", {.allowUnconfirmed = true});

  // Gate still isn't blocked since both operations are unconfirmed
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Complete the transaction
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should still not be blocked
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Verify both alarm and storage writes committed
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
  KJ_ASSERT(KJ_ASSERT_NONNULL(expectSync(test.get("foo"))) == kj::str("bar").asBytes());
}

KJ_TEST("allowUnconfirmed setAlarm updating existing alarm") {
  ActorSqliteTest test;

  // Initialize alarm state to 2ms.
  test.setAlarm(twoMs);
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);

  // Gate should be unblocked after setup
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Update alarm to earlier time with allowUnconfirmed
  test.setAlarm(oneMs, {.allowUnconfirmed = true});

  // Gate still isn't blocked
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Complete the transaction - when moving alarm earlier, schedule happens first
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();

  // Gate should still not be blocked
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Verify alarm was updated
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
}

KJ_TEST("allowUnconfirmed setAlarm to later time") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  // Gate should be unblocked after setup
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Update alarm to later time with allowUnconfirmed
  test.setAlarm(twoMs, {.allowUnconfirmed = true});

  // Gate still isn't blocked
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Complete the transaction - when moving alarm later, commit happens first
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();

  // Gate should still not be blocked
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Verify alarm was updated
  KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
}

KJ_TEST("allowUnconfirmed setAlarm to clear alarm") {
  ActorSqliteTest test;

  // Initialize alarm state to 1ms.
  test.setAlarm(oneMs);
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({});
  KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

  // Gate should be unblocked after setup
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Clear alarm with allowUnconfirmed
  test.setAlarm(kj::none, {.allowUnconfirmed = true});

  // Gate still isn't blocked
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Complete the transaction
  test.pollAndExpectCalls({"commit"})[0]->fulfill();
  test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill();

  // Gate should still not be blocked
  KJ_ASSERT(test.gate.wait().poll(test.ws));

  // Verify alarm was cleared
  KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
}

KJ_TEST("unconfirmed setAlarm failure still breaks output gate") {
  ActorSqliteTest test({.monitorOutputGate = false});

  auto promise = test.gate.onBroken();

  // Do an unconfirmed setAlarm
  test.setAlarm(oneMs, {.allowUnconfirmed = true});

  // The output gate is not applied initially.
  KJ_ASSERT(test.gate.wait().poll(test.ws));
  KJ_ASSERT(!promise.poll(test.ws));

  // Fulfill scheduleRun but reject the commit to simulate failure
  test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
  test.pollAndExpectCalls({"commit"})[0]->reject(KJ_EXCEPTION(FAILED, "alarm commit failed"));

  // Gate should be broken due to commit failure
  KJ_EXPECT_THROW_MESSAGE("alarm commit failed", promise.wait(test.ws));
}

KJ_TEST("sync() throws after critical error in explicit transaction") {
  ActorSqliteTest test({.monitorOutputGate = false});

  // Start an explicit transaction
  auto txn = test.actor.startTransaction();

  // Do a write within the transaction
  txn->put(kj::str("foo"), kj::heapArray(kj::str("bar").asBytes()), {});

  // Trigger a critical error using SQLITE_NOMEM by setting a very low heap limit
  // and then trying to insert a large value.
  try {
    // Set SQLite's memory limit very low to trigger SQLITE_NOMEM
    test.db.run("PRAGMA hard_heap_limit=8192");  // 8KB limit

    // Create data that will exceed the memory limit
    auto largeData = kj::heapArray<byte>(50000, 'X');  // 50KB

    // This should trigger SQLITE_NOMEM, causing SQLite to auto-rollback the transaction,
    // which will trigger the critical error handler.
    //
    // We have to copy it again in order to convert to a Array<const byte> from an Array<byte>.
    txn->put(kj::str("large_key"), kj::heapArray<const byte>(largeData.asBytes()), /*options=*/{});
    KJ_FAIL_ASSERT("Query should have failed with SQLITE_NOMEM");
  } catch (kj::Exception& e) {
    // Expected: out of memory error. We catch and ignore this to continue the test.
    KJ_ASSERT(e.getDescription().contains("out of memory"));
  }

  // sync() should also throw an exception because the storage is now broken
  auto syncResult = test.sync();
  KJ_ASSERT(syncResult != kj::none);
  auto syncPromise = kj::mv(KJ_ASSERT_NONNULL(syncResult));
  KJ_EXPECT_THROW_MESSAGE("broken", syncPromise.wait(test.ws));

  // The transaction is now in a broken state due to the critical error.
  // Attempting to commit should fail.
  KJ_EXPECT_THROW_MESSAGE("broken", txn->commit());
}

}  // namespace
}  // namespace workerd
